Spark notebook

This notebook will only work in a Jupyter session running on mathmadslinux2p.

You can start your own Jupyter session on mathmadslinux2p and open this notebook in Chrome on the MADS Windows server by

Steps

  1. Login to the MADS Windows server using https://mathportal.canterbury.ac.nz/.
  2. Download or copy this notebook to your home directory.
  3. Open powershell and run ssh mathmadslinux2p.
  4. Run start_pyspark_notebook or /opt/anaconda3/bin/jupyter-notebook --ip 132.181.129.68 --port $((8000 + $((RANDOM % 999)))).
  5. Copy / paste the url provided in the shell window into Chrome on the MADS Windows server.
  6. Open the notebook from the Jupyter root directory (which is your home directory).
  7. Run start_spark() to start a spark session in the notebook.
  8. Run stop_spark() before closing the notebook or kill your spark application by hand using the link in the Spark UI.
In [13]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

Example notebook

The code below provides a template for how you would use a notebook to start spark, run some code, and then stop spark.

Steps

  • Run start_spark() to start a spark session in the notebook (only change the default resources when advised to do so for an exercise or assignment)
  • Write and run code interactively, creating additional cells as needed.
  • Run stop_spark() before closing the notebook or kill your spark application by hand using the link in the Spark UI.
In [14]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

Spark

The spark session is active, look for sjb131 (jupyter) under the running applications section in the Spark UI.

Config

spark.dynamicAllocation.enabledfalse
spark.executor.instances4
spark.driver.memory4g
spark.executor.memory4g
spark.masterspark://masternode2:7077
spark.executor.iddriver
spark.sql.warehouse.dirfile:/users/home/sjb131/Assignment%20One/spark-warehouse
spark.executor.cores2
spark.driver.hostmathmadslinux2p.canterbury.ac.nz
spark.sql.shuffle.partitions32
spark.driver.extraJavaOptions-Dderby.system.home=/tmp/sjb131/spark/
spark.app.namesjb131 (jupyter)
spark.rdd.compressTrue
spark.ui.port4224
spark.app.idapp-20240502065210-0641
spark.serializer.objectStreamReset100
spark.driver.maxResultSize0
spark.cores.max8
spark.submit.pyFiles
spark.submit.deployModeclient
spark.app.startTime1714589530145
spark.driver.port40441
spark.ui.showConsoleProgresstrue

Notes

  • The spark session spark and spark context sc global variables have been defined by start_spark().
  • Please run stop_spark() before closing the notebook or restarting the kernel or kill sjb131 (jupyter) by hand using the link in the Spark UI.
In [15]:
import sys

sys.path.append(r'C:\Users\sjb131\AppData\Roaming\Python\Python311\site-packages')
In [16]:
from iso3166 import countries as iso_country_codes
from joshuaproject.countrycodes import CountryCodes as fips_country_codes
from pyspark.sql import functions as F
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import os
In [17]:
stations = spark.read.parquet("hdfs:///user/sjb131/outputs/ghcnd/ghcnd-stations.parquet")
show_as_html(stations, 10)
countries = spark.read.parquet("hdfs:///user/sjb131/outputs/ghcnd/ghcnd-countries.parquet")
show_as_html(countries, 10)

schema_rainfall = StructType([
    StructField("year", StringType(), True),
    StructField("code", StringType(), True),
    StructField("total_rainfall", DoubleType(), True),
    StructField("station_count", IntegerType(), True),
    StructField("data_point_count", IntegerType(), True),
    StructField("expected_data_points", IntegerType(), True),
    StructField("average_rainfall", DoubleType(), True),
    StructField("scaling_factor", DoubleType(), True),
    StructField("scaled_avg_rainfall", DoubleType(), True),
])

rainfall = spark.read.csv("hdfs:///user/sjb131/outputs/ghcnd/rainfall.csv", schema=schema_rainfall)
show_as_html(rainfall, 10)

schema_daily = StructType([
    StructField("id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("element", StringType(), True),
    StructField("value", DoubleType(), True),
    StructField("measurement_flag", StringType(), True),
    StructField("quality_flag", StringType(), True),
    StructField("source_flag", StringType(), True),
    StructField("observation_time", StringType(), True),
    StructField("station_name", StringType(), True),
])

nz_daily = spark.read.csv("hdfs:///user/sjb131/outputs/ghcnd/nz_daily.csv", schema=schema_daily)
nz_daily = nz_daily.withColumn('date', F.to_date('date', 'yyyyMMdd'))

show_as_html(nz_daily, 10)
id latitude longitude elevation state name gsn_flag hcncrn_flag wmo_id station_country_code first_year last_year element_count core other prcp_count
0 AE000041196 25.333 55.5170 34.0 SHARJAH INTER. AIRP GSN 41196 AE 1944 2024 4 3 1 1
1 AEM00041218 24.262 55.6090 264.9 AL AIN INTL 41218 AE 1994 2024 4 3 1 1
2 AGE00147715 35.420 8.1197 863.0 TEBESSA AG 1879 1938 3 3 0 1
3 AGE00147794 36.780 5.1000 225.0 BEJAIA-CAP CARBON AG 1926 1938 2 2 0 0
4 AGM00060402 36.712 5.0700 6.1 SOUMMAM 60402 AG 1973 2024 5 4 1 1
5 AGM00060430 36.300 2.2330 721.0 MILIANA 60430 AG 1957 2024 5 4 1 1
6 AGM00060461 35.700 -0.6500 22.0 ORAN-PORT 60461 AG 1995 2024 4 3 1 1
7 AGM00060514 35.167 2.3170 801.0 KSAR CHELLALA 60514 AG 1995 2024 5 4 1 1
8 AGM00060515 35.333 4.2060 459.0 BOU SAADA 60515 AG 1984 2024 4 3 1 1
9 AGM00060550 33.667 1.0000 1347.0 EL-BAYADH 60550 AG 1973 2024 5 4 1 1
code name
0 AC Antigua and Barbuda
1 AE United Arab Emirates
2 AF Afghanistan
3 AG Algeria
4 AJ Azerbaijan
5 AL Albania
6 AM Armenia
7 AO Angola
8 AQ American Samoa [United States]
9 AR Argentina
year code total_rainfall station_count data_point_count expected_data_points average_rainfall scaling_factor scaled_avg_rainfall
0 1805 GM 5876.0 1 365 365 5876.0 1.000000 5876.000000
1 1810 GM 5744.0 1 363 365 5744.0 0.994521 5775.647383
2 1813 GM 7526.0 1 100 365 7526.0 0.273973 27469.900000
3 1816 EZ 4966.0 1 366 366 4966.0 1.000000 4966.000000
4 1823 EZ 4438.0 1 365 365 4438.0 1.000000 4438.000000
5 1834 UK 5413.0 1 365 365 5413.0 1.000000 5413.000000
6 1837 EZ 6349.0 1 365 365 6349.0 1.000000 6349.000000
7 1838 EZ 5417.0 1 365 365 5417.0 1.000000 5417.000000
8 1841 EZ 4906.0 1 365 365 4906.0 1.000000 4906.000000
9 1842 SW 3755.0 1 365 365 3755.0 1.000000 3755.000000
id date element value measurement_flag quality_flag source_flag observation_time station_name
0 NZ000093012 2003-01-01 TMAX 247.0 None None G None KAITAIA
1 NZ000093012 2003-01-01 TMIN 142.0 None None G None KAITAIA
2 NZ000093292 2003-01-01 TMAX 252.0 None None G None GISBORNE AERODROME
3 NZ000093292 2003-01-01 TMIN 105.0 None None G None GISBORNE AERODROME
4 NZ000093417 2003-01-01 TMAX 215.0 None None G None PARAPARAUMU AWS
5 NZ000093417 2003-01-01 TMIN 109.0 None None G None PARAPARAUMU AWS
6 NZ000093844 2003-01-01 TMAX 192.0 None None G None INVERCARGILL AIRPOR
7 NZ000093844 2003-01-01 TMIN 59.0 None None G None INVERCARGILL AIRPOR
8 NZ000093994 2003-01-01 TMAX 242.0 None None G None RAOUL ISL/KERMADEC
9 NZ000093994 2003-01-01 TMIN 210.0 None None G None RAOUL ISL/KERMADEC
In [18]:
fips_codes = fips_country_codes()
fips_to_iso_mapping = {row.FIPS: row.ISO for row in fips_codes}
broadcast_fips_to_iso_mapping = spark.sparkContext.broadcast(fips_to_iso_mapping)
In [19]:
        
def convert_fips_to_iso(fips_code):
    return broadcast_fips_to_iso_mapping.value.get(fips_code)

# Register the function as a UDF
convert_fips_to_iso_udf = F.udf(convert_fips_to_iso)
In [20]:
alpha2_to_alpha3_mapping = {country.alpha2: country.alpha3 for country in iso_country_codes}
broadcast_alpha2_to_alpha3_mapping = spark.sparkContext.broadcast(alpha2_to_alpha3_mapping)
In [21]:
def convert_alpha2_to_alpha3(alpha2_code):
    return broadcast_alpha2_to_alpha3_mapping.value.get(alpha2_code)

# Register the function as a UDF
convert_alpha2_to_alpha3_udf = F.udf(convert_alpha2_to_alpha3)
In [22]:
countries = countries.withColumn('country_code_iso_alpha2', convert_fips_to_iso_udf(countries['code']))
countries = countries.withColumn('country_code_iso_alpha3', convert_alpha2_to_alpha3_udf(countries['country_code_iso_alpha2']))
show_as_html(countries, 10)
code name country_code_iso_alpha2 country_code_iso_alpha3
0 AC Antigua and Barbuda AG ATG
1 AE United Arab Emirates AE ARE
2 AF Afghanistan AF AFG
3 AG Algeria DZ DZA
4 AJ Azerbaijan AZ AZE
5 AL Albania AL ALB
6 AM Armenia AM ARM
7 AO Angola AO AGO
8 AQ American Samoa [United States] AS ASM
9 AR Argentina AR ARG
In [23]:
import seaborn as sns

def get_data_for_plot(element, average_data):
    """
    Takes filtered temperature data and converts it into plottable
    data for the specified element
    """
    
    average_data = average_data.withColumn('year', F.year('date'))
    average_data = average_data.groupBy('year').agg(
        F.avg('average_temperature').alias(element),
        F.count('average_temperature').alias('observation_count')
        ).orderBy(F.col("year"))

    average_data = average_data.withColumn('date', 
        F.concat(F.col('year'), 
            F.lit('-'), 
            F.lit('01'), 
            F.lit('-'), 
            F.lit('01')))
    #average_data_filtered = average_data.filter(average_data['observation_count'] >= 365)
    
    #show_as_html(average_data, 100)

    dates = average_data.select('date').toPandas()['date']
    dates = pd.to_datetime(dates)
    values = average_data.select(element).toPandas()[element]
    values = values / 10
    return dates, values

def initialise_plot(standard = True):
    """
    Create a blank plot for later use
    """
    if standard:
        plt.figure(figsize=(10, 6))
    else:
        plt.figure(figsize=(20, 12))
    plt.xlabel('Year')
    plt.ylabel('Temperature (C)')

    plt.grid(True)
    plt.xticks(rotation=45)
    return plt

def plot_temperature_data(elements, station_codes, daily, country_name="New Zealand"):
    """
    Loop through elements and provide separate plots for minimum and maximum
    temperatures by station if station_codes are provided, otherwise create 
    one plot containing the overall average maximum and minimum temperatures
    """

    plots = []
    plt = None
    for element, friendly_name in elements.items():
        # Filter data for the current element
        filtered_data = daily.filter(nz_daily['element'] == element)
        # If station codes list is empty, plot average for the country by grouping by date
        if not station_codes:
            if plt is None:
                plt = initialise_plot()
            # Group and calculate average temperature by date
            average_data = filtered_data.groupBy('date').agg(F.avg('value').alias('average_temperature')).orderBy('date')
            dates, values = get_data_for_plot(element, average_data)
            plt.plot(dates, values, marker='o', linestyle='-')
            plt.title(f'{country_name} Temperature over time')
        else:
            average_data = filtered_data.groupBy('id', 'date').agg(F.avg('value').alias('average_temperature'))
            palette = sns.color_palette("husl", len(station_codes))
            legend_entries = []
            plt = initialise_plot(False)
            for i, (station_id, name) in enumerate(station_codes.items()):
                filtered_average_data = average_data.filter(average_data['id'] == station_id)
                dates, values = get_data_for_plot(element, filtered_average_data)
                #print(dates)
                plt.plot(dates, values, marker='o', linestyle='-', color=palette[i])
                legend_entries.append(plt.Line2D([], [], color=palette[i], label=name.strip()))
            plt.title(f'{country_name} - Average {friendly_name} Temperature over time by Station')
            plt.legend(handles=legend_entries)
            plots.append(plt)

    if not station_codes:
       plots.append(plt) 
    
    return plots

elements = {'TMIN': 'Minimum', 'TMAX': 'Maximum'}
overall_average_plot = plot_temperature_data(elements, [], nz_daily)
overall_average_plot[0].show()

nz_stations = stations.filter(stations['station_country_code'] == 'NZ')
distinct_nz_stations = nz_stations.select('id', 'name').distinct()
nz_stations_dict = dict(distinct_nz_stations.rdd.map(lambda row: (row['id'], row['name'])).collect())

stations_average_plot = plot_temperature_data(elements, nz_stations_dict, nz_daily)
for plot in stations_average_plot:
    plot.show()
In [24]:
#https://plotly.com/python/choropleth-maps/

confidence_level_cutoff = 0.05
# Find the records with the highest scaled_avg_rainfall
highest_scaled_rainfall_record = rainfall.orderBy(F.desc('scaled_avg_rainfall')).limit(10)
highest_scaled_rainfall_record_with_countries = highest_scaled_rainfall_record.join(countries, highest_scaled_rainfall_record['code'] == countries['code'], 'inner')

# Display the records
show_as_html(highest_scaled_rainfall_record_with_countries.orderBy(F.desc('scaled_avg_rainfall')))

# Find the records with the highest average_rainfall
highest_avg_rainfall_record = rainfall.orderBy(F.desc('average_rainfall')).limit(10)
highest_avg_rainfall_record_with_countries = highest_avg_rainfall_record.join(countries, highest_avg_rainfall_record['code'] == countries['code'], 'inner')

# Display the record
show_as_html(highest_avg_rainfall_record_with_countries.orderBy(F.desc('average_rainfall')))

rainfall_filtered = rainfall.filter((rainfall['year'] == '2023') & (rainfall['scaling_factor'] > confidence_level_cutoff))
rainfall_filtered = rainfall_filtered.withColumn('scaled_avg_rainfallmm', F.col('scaled_avg_rainfall') / 10)
rainfall_filtered = rainfall_filtered.withColumn('average_rainfallmm', F.col('average_rainfall') / 10)
joined_data = rainfall_filtered.join(countries, rainfall_filtered['code'] == countries['code'], 'inner')
# Create a new dataframe for records with scaling_factor <= 0.05
low_confidence = rainfall.filter((rainfall['year'] == '2023') & (rainfall['scaling_factor'] <= confidence_level_cutoff))
low_confidence_countries = low_confidence.join(countries, low_confidence['code'] == countries['code'], 'inner')
show_as_html(low_confidence_countries, 100)

rainfall_filtered_pd = joined_data.toPandas()

max_rainfall = rainfall_filtered_pd['average_rainfallmm'].max()
year code total_rainfall station_count data_point_count expected_data_points average_rainfall scaling_factor scaled_avg_rainfall code name country_code_iso_alpha2 country_code_iso_alpha3
0 2000 EK 4361.0 1 1 366 4361.0 0.002732 1.596126e+06 EK Equatorial Guinea GQ GNQ
1 1975 DR 3414.0 1 1 365 3414.0 0.002740 1.246110e+06 DR Dominican Republic DO DOM
2 1974 LA 4961.0 1 2 365 4961.0 0.005479 9.053825e+05 LA Laos LA LAO
3 1978 BH 15713.0 1 7 365 15713.0 0.019178 8.193207e+05 BH Belize BZ BLZ
4 1979 NN 19670.0 1 10 365 19670.0 0.027397 7.179550e+05 NN Sint Maarten SX SXM
5 1974 CS 3640.0 1 2 365 3640.0 0.005479 6.643000e+05 CS Costa Rica CR CRI
6 1979 BH 19311.0 1 11 365 19311.0 0.030137 6.407741e+05 BH Belize BZ BLZ
7 1973 NS 5130.0 1 3 365 5130.0 0.008219 6.241500e+05 NS Suriname SR SUR
8 1978 UC 43551.0 1 26 365 43551.0 0.071233 6.113890e+05 UC Curacao CW CUW
9 1977 BH 10792.0 1 7 365 10792.0 0.019178 5.627257e+05 BH Belize BZ BLZ
year code total_rainfall station_count data_point_count expected_data_points average_rainfall scaling_factor scaled_avg_rainfall code name country_code_iso_alpha2 country_code_iso_alpha3
0 1967 NC 165890.0 1 365 365 165890.0 1.0 165890.0 NC New Caledonia [France] NC NCL
1 1956 NC 153010.0 1 366 366 153010.0 1.0 153010.0 NC New Caledonia [France] NC NCL
2 1961 NC 150600.0 1 365 365 150600.0 1.0 150600.0 NC New Caledonia [France] NC NCL
3 1954 NC 141680.0 1 365 365 141680.0 1.0 141680.0 NC New Caledonia [France] NC NCL
4 1972 NC 134290.0 1 366 366 134290.0 1.0 134290.0 NC New Caledonia [France] NC NCL
5 1962 NC 125160.0 1 365 365 125160.0 1.0 125160.0 NC New Caledonia [France] NC NCL
6 1951 NC 112090.0 1 365 365 112090.0 1.0 112090.0 NC New Caledonia [France] NC NCL
7 1960 NC 110910.0 1 366 366 110910.0 1.0 110910.0 NC New Caledonia [France] NC NCL
8 1971 NC 109430.0 1 365 365 109430.0 1.0 109430.0 NC New Caledonia [France] NC NCL
9 1958 NC 107700.0 1 365 365 107700.0 1.0 107700.0 NC New Caledonia [France] NC NCL
year code total_rainfall station_count data_point_count expected_data_points average_rainfall scaling_factor scaled_avg_rainfall code name country_code_iso_alpha2 country_code_iso_alpha3
0 2023 ID 86264.0 89 639 32485 969.258427 0.019671 49274.428795 ID Indonesia ID IDN
1 2023 ZA 555.0 3 12 1095 185.000000 0.010959 16881.250000 ZA Zambia ZM ZMB
2 2023 NU 0.0 1 1 365 0.000000 0.002740 0.000000 NU Nicaragua NI NIC
3 2023 MR 575.0 2 23 730 287.500000 0.031507 9125.000000 MR Mauritania MR MRT
4 2023 EG 1645.0 7 71 2555 235.000000 0.027789 8456.690141 EG Egypt EG EGY
5 2023 SU 681.0 1 3 365 681.000000 0.008219 82855.000000 SU Sudan SD SDN
6 2023 CG 9404.0 6 87 2190 1567.333333 0.039726 39453.563218 CG Congo (Kinshasa) CD COD
7 2023 SL 1503.0 1 15 365 1503.000000 0.041096 36573.000000 SL Sierra Leone SL SLE
8 2023 UZ 17278.0 26 293 9490 664.538462 0.030875 21523.788396 UZ Uzbekistan UZ UZB
9 2023 NI 16553.0 8 139 2920 2069.125000 0.047603 43466.510791 NI Nigeria NG NGA
10 2023 SA 17861.0 23 167 8395 776.565217 0.019893 39037.514970 SA Saudi Arabia SA SAU
11 2023 SY 4034.0 5 77 1825 806.800000 0.042192 19122.207792 SY Syria SY SYR
In [25]:
fig = go.Figure(data=go.Choropleth(
    locations = rainfall_filtered_pd['country_code_iso_alpha3'],
    z = rainfall_filtered_pd['average_rainfallmm'],
    text = rainfall_filtered_pd['name'],
    colorscale = 'Blues',
    hovertemplate='%{text}: %{z:.2f} mm',  # Format hover text with 2 decimal places and "mm"
    autocolorscale=False,
    reversescale=False,
    marker_line_color='darkgray',
    marker_line_width=0.5,
    colorbar_title = 'Annual Rainfall (mm)',
    zmin=0, 
    zmax=max_rainfall,
))

fig.update_layout(
    title_text='2023 Annual Rainfall',
    geo=dict(
        showframe=False,
        showcoastlines=True,  # Show coastlines
        projection_type='equirectangular'
    ),
    annotations = [dict(
        x=0.55,
        y=0.1,
        xref='paper',
        yref='paper',
        text='Source: <a href="https://www.ncei.noaa.gov/products/land-based-station/global-historical-climatology-network-daily">\
            GHCN Daily Data</a>',
        showarrow = False
    )],
    coloraxis_colorbar=dict(
        title='Annual Rainfall (mm)',  # Colorbar title
        lenmode='pixels',
        len=400,  # Adjust colorbar length
    )
)

fig.show()
In [26]:
os.system('jupyter nbconvert --to html_embed Plots.ipynb')
Out[26]:
32512
In [27]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()

Spark

The spark session is stopped, confirm that sjb131 (jupyter) is under the completed applications section in the Spark UI.